跳到主要内容

Flink CDC相关实践

官方提供

Flink CDC构建MySQL和Postgres上的 Streaming ETL

Flink CDC 实现 MySQL 数据实时入 Apache Doris 数仓

基于 Flink CDC 同步 MySQL 分库分表,构建 Iceberg实时数据湖

项目实战

知识准备

查看数据开发中的Apache Flink End-to-End-Exactly-Once

支持并发读取的 DataStream API 在 2.0 版本中,无锁算法,并发读取等功能只在 SQL API 上透出给用户,而 DataStream API 未透出给用户,2.1 版本支持了 DataStream API,可通过 MySqlSourceBuilder 创建数据源。用户可以同时捕获多表数据,借此搭建整库同步链路。同时通过 MySqlSourceBuilder#includeSchemaChanges 还能捕获schema 变更。

在 2.2 版本中MySQL CDC 支持动态加表 设想下假如你一个 CDC pipeline 监控了 4 张表,突然有天业务需求需要再加几张表,你肯定不想另起作业 (浪费资源),那么这个 feature 可以让你在当前作业直接增加需要监控的表。新增表都是先做全量再优雅地切换到增量,遇到新增监控表时不用新起作业,极大地节约了资源。Flink CDC 网站也提供了该功能的使用文档[2],用户在开发时可以参考。

由上可知,Flink CDC 2.0开始可以支持搭建整库同步链路,在2.2 版本支持动态加表,那么我们来实现一个动态加表的整库同步到Kafka案例,使用这个案例直接使用最近发布的2.4版本作为基础来实现。

需求

  1. MySQL整库的changelog数据同步到Kafka,数据不丢不重。

  2. 支持动态的加入新表,加入以后能够在不停加的情况,新表的数据发送到Kafka。

  3. 数据发送到Kafka的单行数据的changelog不乱序。

  4. 可配置化。

需求分析

kafka生产者图片

需求实现思路

需求解决方案
MySQL整库的changelog数据同步到Kafka,数据不丢不重。Flink CDC能够保证读取的时候不丢不重复,那么只要保证发送到Kafka的时候不丢就行了(具体原理和操作查看数据开发Apache Flink 的 End-to-End-Exactly-Once)。
支持动态的加入新表,加入以后能够在不停加的情况,新表的数据发送到Kafka。Flink CDC正则mysql_db=dbname mysql_tables=dbname.*,就能够实现动态表的监听不停机操作。
数据发送到Kafka的单行数据的changelog不乱序。一张表对应一个topic,并且单行数据的crud操作的key用[tablename_id] 这样就能够使得单行数据到一个主题的一个分区,实现了数据不乱序。
可配置化。ParameterTool parameter = ParameterTool.fromArgs(args);把程序所有的变量抽象处理实现外部可配置化,达到开发接口实现整库同步的效果。

实现的难点

Flink CDC 本身的序列化器是没有包含数据的主键的,那么得到主键的主键才能保证单行数据发送到Kafka数据不乱序,比如[tablename_id],刚好下面伪代码可以得到对应的主键信息。

 com.ververica.cdc.debezium.DebeziumDeserializationSchema
public void deserialize(SourceRecord record, Collector<String> out)
Schema sourceKeySchema = record.keySchema();
List<Field> fields = sourceKeySchema.fields();
//得到数据的表名
String tableName = initDataInfo.getJSONObject("source").getString("table");
resultPrimaryKey.append(tableName);
//得到数据的主键数据
for (Field field : fields) {
String primaryKeyName = field.name();
String primaryKeyValueItem="";
if(initDataInfo.getJSONObject("after")!=null){
primaryKeyValueItem= initDataInfo.getJSONObject("after").getString(primaryKeyName);
}else{
primaryKeyValueItem=initDataInfo.getJSONObject("before").getString(primaryKeyName);
}
resultPrimaryKey.append(primaryKeyValueItem);
}

//上面实现的FlinkCDC序列化器装载进来。
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(mysql_server)
.port(Integer.parseInt(mysql_port))
.scanNewlyAddedTableEnabled(true)
.databaseList(mysql_db) // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList(mysql_tables) // set captured table
.username(mysql_username)
.password(mysql_password)
// .serverTimeZone(mysql_timezone)
.startupOptions(StartupOptions.initial())
.includeSchemaChanges(true)
.deserializer(new CustomFlinkCDCJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

最后得到的结果数据为,如下面的数据多了一个data_key和data_target_topic,这里加进去的主要作用是在Flink Kafka Product的时候能够知道发送到了那个主题,和对应的分区做好数据准备。

{
"op": "r",
"data_key": "tablename_id",
"data_target_topic": "databasename_tablename",
"after": {
"test1": "数据"
"id": 89
},
"source": {
"server_id": 0,
"version": "1.3",
"file": "",
"connector": "mysql",
"pos": 0,
"name": "mysql_binlog_source",
"row": 0,
"ts_ms": 0,
"snapshot": "false",
"db": "库名",
"table": "表名"
},
"ts_ms": 1691472223325
}

KafkaSink,自定义一些主题选择器,KeySerialization和ValueSerialization对之前的数据进行处理得到最初的Flink CDC 采集到的数据。

        Properties properties = new Properties();
properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,CommonString.CHECKPOINT_TIMEOUT+CommonString.CHECKPOINT_TIME*2);
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "2000");
KafkaSink sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka_servsers)
//自定义序列化器
.setRecordSerializer(CustomKafkaRecordSerializationSchema.buildCustomKafkaRecordSerializationSchema())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(kafka_transactionalIdprefix)
.setKafkaProducerConfig(properties)
.build();
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CustomMySQL Source")
.setParallelism(3).sinkTo(sink);

自定义序列化器代码

/**
* Selects a topic for the incoming record.
*
* @param <IN> type of the incoming record
*/
@PublicEvolving
public interface TopicSelector<IN> extends Function<IN, String>, Serializable {}


public class CustomKafkaRecordSerializationSchema {
public static KafkaRecordSerializationSchema buildCustomKafkaRecordSerializationSchema(){
return KafkaRecordSerializationSchema.builder()
.setTopicSelector(new TopicSelector<String>() {
@Override
public String apply(String initData) {
JSONObject jsonObject = JSONObject.parseObject(initData);
String targetTopic = jsonObject.getString("topic");
return targetTopic;
}
})
.setValueSerializationSchema(new CustomFlinkKafkaSimpleStringValueSchema())
.setKeySerializationSchema(new CustomFlinkKafkaSimpleStringKeySchema())
.setPartitioner(new FlinkKafkaPartitioner<String>() {
@Override
public int partition(String record, byte[] key, byte[] value, String topic, int[] partitions) {
String keyString = new String(key, StandardCharsets.UTF_8);
int keyHashCode = keyString.hashCode();
//确保指定的分区不为负数
int resHashNum=keyHashCode&0x7FFFFFFF;
return resHashNum%partitions.length;
}
})
.build();
}
}
CustomFlinkKafkaSimpleStringKeySchema
public byte[] serialize(String element) {
JSONObject jsonObject = JSONObject.parseObject(element);
String data_key = jsonObject.getString("data_key");
return data_key.getBytes();
}
CustomFlinkKafkaSimpleStringValueSchema
public byte[] serialize(String element) {
JSONObject jsonObject = JSONObject.parseObject(element);
jsonObject.remove("data_key");
jsonObject.remove("data_target_topic");
element=jsonObject.toJSONString();
return element.getBytes(this.charset);
}

经过处理最后得到Flink CDC最原始的changelog数据,并且保证了数据的整库同步,不丢不重复,动态加表,单行数据在Kafka不乱序(单行数据都根据自己的主键都发送到一个分区)的需求。

{
"op": "r",
"after": {
"test1": "数据"
"id": 89
},
"source": {
"server_id": 0,
"version": "1.3",
"file": "",
"connector": "mysql",
"pos": 0,
"name": "mysql_binlog_source",
"row": 0,
"ts_ms": 0,
"snapshot": "false",
"db": "库名",
"table": "表名"
},
"ts_ms": 1691472223325
}